package com.kool.kmqtt.server.parser;

import com.kool.kmqtt.server.exception.ErrorCode;
import com.kool.kmqtt.server.exception.ProtocolException;
import com.kool.kmqtt.server.packet.FixedHeader;
import com.kool.kmqtt.server.packet.PublishPayload;
import com.kool.kmqtt.server.packet.PublishVariableHeader;
import io.netty.buffer.ByteBuf;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;

/**
 * PUBLISH报文解析器
 */
@Slf4j
public class PublishPacketParser extends PacketParser {
    public PublishPacketParser(FixedHeader fixedHeader) {
        super(fixedHeader);
    }

    @Override
    public void parseVariableHeader(ByteBuf in) {
        PublishVariableHeader variableHeader = new PublishVariableHeader();
        byte topicNameLengthMSB = in.readByte();
        byte topicNameLengthLSB = in.readByte();
        int topicNameLength = (Byte.toUnsignedInt(topicNameLengthMSB) << 8) + Byte.toUnsignedInt(topicNameLengthLSB);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        for (int i = 0; i < topicNameLength; i++) {
            byteArrayOutputStream.write(in.readByte());
        }
        int variableHeaderLength = 2 + topicNameLength;
        //主题
        try {
            String topicName = new String(byteArrayOutputStream.toByteArray(), "utf-8");
            variableHeader.setTopicName(topicName);
        } catch (UnsupportedEncodingException e) {
            throw new ProtocolException(ErrorCode.CHARSET_NOT_UTF8, e);
        }

        int qos = packet.getFixedHeader().getQoS();
        if (qos > 0) {
            byte packetIdMSB = in.readByte();
            byte packetIdLSB = in.readByte();
            //报文标识
            int packetId = (Byte.toUnsignedInt(packetIdMSB) << 8) + Byte.toUnsignedInt(packetIdLSB);
            variableHeader.setPacketId(packetId);
            packet.setPacketId(packetId);
            variableHeaderLength += 2;
        }
        packet.setVariableHeaderLength(variableHeaderLength);
        packet.setVariableHeader(variableHeader);
    }

    @Override
    public void parsePayload(ByteBuf in) {
        ByteArrayOutputStream payloadBytesArray = new ByteArrayOutputStream();
        for (int i = 0; i < packet.getFixedHeader().getRemainingLength() - packet.getVariableHeaderLength(); i++) {
            payloadBytesArray.write(in.readByte());
        }
        byte[] payloadBytes = payloadBytesArray.toByteArray();
        PublishPayload payload = new PublishPayload();
        payload.setPayload(payloadBytes);
        packet.setPayload(payload);

        log.debug("PUBLISH报文载荷UTF8解码：{}", new String(payloadBytes, StandardCharsets.UTF_8));
    }
}
